Stars and wiki

Get wiki views and scrape wikipedia. Using the hyg dataset for locations. This is a summary notebook without the not-so-interesting bits removed (see projections nb for azimuthal etc.)

In [2]:
import pandas as pd
import numpy as np
import wikitextparser as wtp
import requests, re, csv, pickle, json
from collections import defaultdict

from plotly.offline import iplot, init_notebook_mode
import plotly.graph_objs as go
init_notebook_mode()
import plotly.io as pio
In [6]:
%%javascript
//hack to fix export
require.config({
  paths: {
    d3: 'https://cdnjs.cloudflare.com/ajax/libs/d3/5.9.2/d3',
    jquery: 'https://code.jquery.com/jquery-3.4.1.min',
    plotly: 'https://cdn.plot.ly/plotly-latest.min'
  },

  shim: {
    plotly: {
      deps: ['d3', 'jquery'],
      exports: 'plotly'
    }
  }
});
In [ ]:
## the class to parse wiki.
class WikicatParser():
    """
    Gets all the pages recursively within a category and parser the content (via a suplied function) and gets pageviews.
    >>>pages = WikicatParser(cat_name, custom_page_parser=my_function, extra_fields=[], forbidden_categories_keywords=[...]).get_pages_recursively()
    >>>print(pages.data)
    >>>pandas.DataFrame.from_records(list(pages.data.values()))
    custom_page_parser is for content mining. a function that given wiki text returns a dictionary of whatever it mined.
    Any extra fields need to be be added to extra_fields or to_csv will fail.
    
    .get_pages_recursively gets everything downwards. Do note that .forbidden_categories_keywords may need to be set.
    This calls both .get_pages and .get_subcategories, both of which actually call .get_members which calls get, which is the web fetcher.
    .get_pageviews gets the page views.
    """
    api = "https://en.wikipedia.org/w/api.php"
    
    def __init__(self, category, 
                 no_views=False, 
                 no_content=False, 
                 custom_page_parser=None,
                 wanted_templates = None,
                 extra_fields=None,
                 forbidden_categories_keywords=None):
        self.session = requests.Session()
        self.no_views = no_views
        self.no_content = no_content
        self.data = {}
        if 'Category:' not in category:
            self.category = 'Category:'+category
        else:
            self.category = category
        self.category_map = {}
        self.category_cleaned = category.replace(' ','_').replace('Category:','')
        if custom_page_parser:
            self.page_parser = custom_page_parser
        elif wanted_templates:
            self.wanted_templates = wanted_templates
            self.page_parser = self.parse_templates
        else:
            self.no_content = True
            self.page_parser = lambda text: {}
        if extra_fields:
            self.extra_fields = extra_fields
        else:
            self.extra_fields = []
        if forbidden_categories_keywords:
            if isinstance(forbidden_categories_keywords, str):
                self.forbidden_categories_keywords = [self.forbidden_categories_keywords.lower()]
            else:
                self.forbidden_categories_keywords = [k.lower() for k in forbidden_categories_keywords]
        else:
            self.forbidden_categories_keywords = []
        
    def get(self, params):
        """
        Fetch data.
        """
        data = self.session.get(url=self.api, params=params).json()
        if 'continue' in data:
            params['cmcontinue'] = data['continue']['cmcontinue']
            t = list(data['query'].keys())[0]
            new_data = self.get(params)
            new_data['query'][t] = [*data['query'][t], *new_data['query'][t]]
            data = new_data
        return data
    
    def _add_datum(self, data, cat):
        for d in data:
            name = d["title"]
            if name not in self.data:
                self.data[name] = d
                self.data[name]['category'] = cat
                if not self.no_views:
                    self.data[name]['views'] = self.get_pageviews(name)
                if not self.no_content:
                    wiki = self.get_content(name)
                    for key, value in self.page_parser(wiki).items():
                        self.data[name][key] = value
            else:
                self.data[name]["category"] += '|' + cat
        
    def get_subcategories(self, cat):
        subcats = []
        for subcat in self.get_members(cat, 'subcat'):
            for k in self.forbidden_categories_keywords:
                if k in subcat['title'].lower():
                    print(f'BAN: {subcat["title"]} removed because it contained {k}')
                    break
            else:
                subcats.append(subcat)
        self.category_map[cat] = [s['title'] for s in subcats]
        return subcats
                          
    def get_page_by_name(self,name, cat='Manual'):
        #gets the page by the name specified! This is a fix!
        self._add_datum([{'title': name}],cat)
    
    def get_pages(self,cat):
        #gets all the pages within the category
        return self.get_members(cat, 'page')
    
    def get_members(self, cat, cmtype='subcat|page'):
        params = {
            'action': "query",
            'list': "categorymembers",
            'cmtitle': cat,
            'cmtype': cmtype,
            'cmdir': "desc",
            'format': "json"
            }
        r = self.get(params)
        if 'query' not in r:
            print(f'{cat} replied with {str(r)}.')
            return []
        data = r['query']['categorymembers']
        self._add_datum(data, cat)
        return data
    
    def get_pages_recursively(self, cat=None):
        if cat is None:
            cat = self.category
        subcats = [s['title'] for s in self.get_subcategories(cat)]
        data = self.get_pages(cat)
        for c in subcats:
            ndata = self.get_pages_recursively(c)
            print(f'{c} has {len(data)} pages directly and {len(ndata)} in subcategories')
            data.extend(ndata)
        return data
    
    def get_pageviews(self, page):
        url = f"https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia/all-access/all-agents/{page.replace(' ','_').replace('/','%2F')}/monthly/2018060100/2019060100"
        r = self.session.get(url).json()
        if 'items' in r:
            return sum([i['views'] for i in r['items']])/365
        else:
            print('error',page, r)
            return 'NA'
        
    def get_content(self,page):
        params = {
            'action': "query",
            'prop': 'revisions',
            'rvprop': 'content',
            'rvsection': 0,
            'titles': page,
            'format': "json"
            }
        data = self.session.get(url=self.api, params=params).json()
        pageid = list(data['query']['pages'].keys())[0]
        wikimarkup = data['query']['pages'][pageid]['revisions'][0]['*']
        return wikimarkup.encode('utf-8','ignore').decode('unicode_escape','ignore') #not quite right
    
    def to_csv(self):
        """Don't save as csv for storage. Save as pickle. This is just for causal inspection in Excel."""
        with open(f'{self.category_cleaned}.csv','w',newline='') as w:
            dw = csv.DictWriter(w,['title','category', 'ns','views','pageid']+self.extra_fields,
                                extrasaction='ignore')
            dw.writeheader()
            dw.writerows(self.data.values())
        return self
    
    ####### code to convert template to dictionary
    def parse_templates(self, text):
        dex = {}
        for t in wtp.parse(text).templates:
            for want in self.wanted_templates:
                if want.lower() in t.normal_name().lower(): # not using t.name has training space.
                    dex ={**dex, **self._template_to_dict(t)}
        return dex
                          
    def _arg_to_val(self, arg):
        val = arg.value
        for t in arg.templates:
            if t.arguments:
                tval = t.arguments[0].value
                if t.normal_name() in ('nowrap', 'val'):
                    if any(['ul' in a.name for a in t.arguments]): #unit!
                        tval += [a.value for a in t.arguments if 'u' in a.name][0] #u= and ul=
                    val = val.replace(t.string, tval)
        val = re.sub('<.*?\/>','',val) #remove self closing tags
        val = val.replace('&nbsp;',' ')
        val = re.sub('<.*?>.*?<\/.*?>','',val) # remove tags
        val = re.sub('<!--.*?-->','',val) # remove comments
        val = val.replace('–','-') # en dash to hyphen minus
        val = val.replace('–','-') # em dash to hyphen minus
        val = re.sub('±\s+\d+\.?\d*','', val) #clear error for safety
        val = val.rstrip().lstrip()
        return val

    def _arg_to_key(self, arg):
        return arg.name.rstrip().lstrip()

    def _template_to_dict(self, template):
        return {self._arg_to_key(arg): self._arg_to_val(arg) for arg in template.arguments}
In [ ]:
# get the data.
stars = WikicatParser('Category:Stars by luminosity class',wanted_templates=['Starbox'],
                      forbidden_categories_keywords=['Sun'])
stars.get_pages_recursively()
## save before sending to pd as something might get munted.
pickle.dump(stars.data, open('wiki_stars.dict.p','wb'))
json.dump(stars.data, open('wiki_stars.json','w'))

Stars coordinates

I don't trust 100% the numbers on wiki as they are written by humans. Take Procyon... which had a wrong HID and HD.

In [3]:
### file downloaded online from 
# http://www.astronexus.com/files/downloads/hygdata_v3.csv.gz
# field defs are there.


hyg = pd.read_csv('hygdata_v3.csv')
hyg = hyg.drop([0],axis=0) # dim the sun
hyg = hyg.loc[~hyg.ra.isna()]
def get_name(row):
    if isinstance(row.proper, str):
        return row.proper
    elif isinstance(row.bf, str):
        return row.bf
    elif isinstance(row.hd, float) and row.hd > 0:
        return 'HD '+str(int(row.hd))
    elif isinstance(row.gl, float) and row.gl > 0:
        return f'Gliese{row.gl}'
    elif isinstance(row.hip, float) and row.hip > 0:
        return f'HIP{row.hip}'
    else:
        return '???'
    

hyg = hyg.assign(az_rho=hyg.dec.apply(lambda x: np.pi/2 -x))\
         .assign(az_theta=hyg.ra.apply(lambda x: x/24*360 if x > 0 else 360 + x/24*360))\
         .assign(named = hyg.apply(get_name, axis=1).astype(str))

hyg = hyg.assign(s= 1/(2.5**hyg.mag))

import json, re

wiki = json.load(open('wiki_stars.json','r')).values()
whd = {}

for star in wiki:
    r = re.search('HD.*?(\d+)',str(star).replace('\n',''))
    if r:
        hd = int(r.group(1))
        if star['title'] == 'Procyon':
            whd[61421] = star['views']
        elif hd in whd:
            print(f'Duplication for HD {hd} —{star["title"]}')
        else:
            whd[hd] = star['views']
                  
def get_views(hd):
    try:
        hd = int(hd)
        if hd in whd:
            return float(whd[hd])
        else:
            return None
    except Exception:
        return None
    
hyg = hyg.assign(wikiviews=hyg.hd.apply(get_views))

## fix the broken ones.
hyg.at[hyg.loc[hyg.hip == 94311].index.values[0],'wikiviews'] = 30 #19 Lyr is a new article.
hyg.at[hyg.loc[hyg.hd == 58061].index.values[0],'named'] = 'CY CMa'
hyg.at[hyg.loc[hyg.hd == 168442].index.values[0],'named'] = 'Gliese 710'
hyg.at[hyg.loc[hyg.hd == 140283].index.values[0],'named'] = 'Methuselah star'
hyg.at[hyg.loc[hyg.hd == 304043].index.values[0],'named'] = 'Innes\'s star'
Duplication for HD 85512 —HD 85512
Duplication for HD 40307 —HD 40307
Duplication for HD 85512 —Template:HD 85512
Duplication for HD 85512 —HD 85512 b
Duplication for HD 40307 —HD 40307 g
Duplication for HD 40307 —HD 40307 f
Duplication for HD 40307 —HD 40307 e
Duplication for HD 40307 —HD 40307 d
Duplication for HD 40307 —HD 40307 c
Duplication for HD 40307 —HD 40307 b
Duplication for HD 40307 —Template:HD 40307
Duplication for HD 202206 —HD 202206
Duplication for HD 5388 —HD 5388
Duplication for HD 149382 —HD 149382
Duplication for HD 136118 —HD 136118
Duplication for HD 214993 —12 Lacertae
Duplication for HD 98219 —HD 98219
Duplication for HD 2 —Epsilon Phoenicis
Duplication for HD 2 —Gamma2 Normae
Duplication for HD 77557 —78 Cancri
Duplication for HD 97950 —WR 42e

plot

In [4]:
## mag
shyg = hyg.loc[hyg.mag < 7]
m = max(shyg.s)
constellations = [shyg.loc[shyg.con == c] for c in set(shyg.con) if c]
magscatters = [go.Scattergl(
        x = shyg.ra,
        y = shyg.dec,
        text = shyg.named,
        #name = c.con.values[0],
        mode = 'markers',
    marker={'size': shyg.s, 'sizeref': 2.*max(hyg.s)/(30.**2), 'sizemode': 'area', 'color': 'white'}
    )]# for c in constellations]


## wiki
shyg = hyg.loc[~hyg.wikiviews.isna()]
shyg = shyg.loc[shyg.wikiviews > 0.1]
#m = np.log2(max(shyg.wikiviews)*10)
m = max(shyg.wikiviews)

#constellations = [shyg.loc[shyg.con == c] for c in set(shyg.con) if c]

wikiscatters = [go.Scattergl(
        x = shyg.ra,
        y = shyg.dec,
        text = shyg.named,
        #name = .#c.con.values[0],
        mode = 'markers',
    marker={'size': shyg.wikiviews.values, #np.log2(c.wikiviews.values*10),
            'sizeref': 2*m/(30.**2),
            'sizemode': 'area',
             'color': 'white'}
    )]# for c in constellations if len(c)]

bright = hyg.loc[(hyg.mag < 1.5)]

mag_annotations=[go.layout.Annotation(
                                    x=row.ra,
                                    y=row.dec,
                                    xref="x",
                                    yref="y",
                                    text=row.named,
                                    ax=-20,
                                    ay=-20,
                                    font=dict(
                                    size=10,
                                    color="skyblue"), arrowcolor="skyblue"
                                    ) for i, row in bright.iterrows()]

wiki_bright = hyg.loc[(hyg.wikiviews > 200) & (hyg.mag < 1.5)]
wiki_bright_annotations=[go.layout.Annotation(
                                    x=row.ra,
                                    y=row.dec,
                                    xref="x",
                                    yref="y",
                                    text=row.named,
                                    ax=-20,
                                    ay=-20,
                                    font=dict(
                                    size=10,
                                    color="skyblue"), arrowcolor="skyblue"
                                    ) for i, row in wiki_bright.iterrows()]

unwiki_bright = hyg.loc[(hyg.wikiviews < 200) & (hyg.mag < 1.5)]
unwiki_bright_annotations=[go.layout.Annotation(
                                    x=row.ra,
                                    y=row.dec,
                                    xref="x",
                                    yref="y",
                                    text=row.named,
                                    ax=-20,
                                    ay=-20,
                                    font=dict(
                                    size=10,
                                    color="lightcoral"),
                                    arrowcolor="lightcoral"
                                    ) for i, row in unwiki_bright.iterrows()]

wiki_dim = hyg.loc[(hyg.wikiviews > 100) & (hyg.mag > 1.5)]
wiki_dim_annotations=[go.layout.Annotation(
                                    x=row.ra,
                                    y=row.dec,
                                    xref="x",
                                    yref="y",
                                    text=row.named,
                                    ax=20,
                                    ay=20,
                                    font=dict(
                                    size=10,
                                    color="lime"
                                    ), arrowcolor="lime"
                                    ) for i, row in wiki_dim.iterrows()]


common = 'Equirectangular projection (plate carrée) of all stars:'
layout = {'title': f'{common} Magnitude',
          'showlegend': False,
          'yaxis': {'range': [-90, 90]},
          'xaxis': {'range': [0, 24]},
          'annotations': mag_annotations,
          'plot_bgcolor': 'black'
         }

#frames = [go.Frame(data=wikiscatters, layout={'title': f'{common} wiki views'})]

fig = go.Figure(data=magscatters, layout = layout)
iplot(fig, image='png', filename='mag.png', image_width=1280, image_height=1280)
pio.write_image(fig, 'mag_label.png')


layout['title'] = f'{common} Wiki daily views'
layout['annotations'] = wiki_bright_annotations + unwiki_bright_annotations + wiki_dim_annotations
fig = go.Figure(data=wikiscatters, layout = layout)
iplot(fig)
pio.write_image(fig, 'wiki_label.png')
In [5]:
northern = hyg.loc[(~hyg.wikiviews.isna()) & (hyg.dec >= 0)]
southern = hyg.loc[(~hyg.wikiviews.isna()) & (hyg.dec < 0)]
fig = go.Figure(data=[go.Scattergl(x=northern.mag, y=np.log2(northern.wikiviews), name='Northern', text=northern.named, mode='markers', opacity=0.2),
                      go.Scattergl(x=southern.mag, y=np.log2(southern.wikiviews), name='Southern', text=southern.named, mode='markers', opacity=0.2)],
               layout={'title': 'Magnitude vs views',
                       'xaxis': {'title':'Magnitude'},
                       'yaxis': {'title':'log2 Wikiviews', 'range': [-0.5, 11]}
                      })
iplot(fig)
In [ ]: